package org.itstack.demo.netty.server.common;

import io.netty.channel.*;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

/**
 * 提供抽象类，监控发送速率以及获取发送状态
 */
public abstract class MyServerCommonHandler extends SimpleChannelInboundHandler<String> {

    protected boolean sentFlag;

    private Runnable counterTask;

    private AtomicLong consumeMsgLength = new AtomicLong();

    private long priorProgress;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        counterTask = () -> {
            while (true) {
                try {

                    Thread.sleep(500);
                    long length = consumeMsgLength.getAndSet(0);
                    if (0 == length) continue;
                    System.out.println("数据发送速率(KB/s): " + length);
                } catch (InterruptedException e) {

                }
            }
        };

        super.handlerAdded(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        sendData(ctx);
        // 启动线程监控
        new Thread(counterTask).start();
    }

    protected abstract void sendData(ChannelHandlerContext ctx);

    protected ChannelProgressivePromise getChannelProgressivePromise(ChannelHandlerContext ctx, Consumer<ChannelProgressiveFuture> completedAction) {
        ChannelProgressivePromise channelProgressivePromise = ctx.newProgressivePromise();
        channelProgressivePromise.addListener(new ChannelProgressiveFutureListener() {
            @Override
            public void operationProgressed(ChannelProgressiveFuture channelProgressiveFuture, long progress, long total) throws Exception {
                consumeMsgLength.addAndGet(progress - priorProgress);
                priorProgress = progress;
            }

            @Override
            public void operationComplete(ChannelProgressiveFuture channelProgressiveFuture) throws Exception {
                sentFlag = false;
                if (channelProgressiveFuture.isSuccess()) {
                    System.out.println("发送消息成功！");
                    priorProgress -= 10;
                    Optional.ofNullable(completedAction).ifPresent(action -> action.accept(channelProgressiveFuture));
                } else {
                    System.out.println("发送消息失败！");
                    channelProgressiveFuture.cause().printStackTrace();
                }
            }
        });

        return channelProgressivePromise;
    }


    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        System.out.println("Netty Server 接收到消息：" + msg);
    }
}
